Merge pull request #318 from dsander/threaded-background-workers

Provide a new threaded background worker

Dominik Sander 10 years ago
parent
commit
577b95fbd7

+ 9 - 7
Procfile

@@ -1,11 +1,13 @@
1
-# Procfile for development:
1
+# Procfile for development using the new threaded worker (scheduler, twitter stream and delayed job)
2 2
 web: bundle exec rails server
3
-schedule: bundle exec rails runner bin/schedule.rb
4
-twitter: bundle exec rails runner bin/twitter_stream.rb
5
-dj: bundle exec script/delayed_job run
3
+jobs: bundle exec rails runner bin/threaded.rb
6 4
 
7 5
 # Possible Profile configuration for production:
8 6
 # web: bundle exec unicorn -c config/unicorn/production.rb
9
-# schedule: bundle exec rails runner bin/schedule.rb
10
-# twitter: bundle exec rails runner bin/twitter_stream.rb
11
-# dj: bundle exec script/delayed_job run
7
+# jobs: bundle exec rails runner bin/threaded.rb
8
+
9
+# Old version with seperate processes (use this if you have issues with the threaded version)
10
+#web: bundle exec rails server
11
+#schedule: bundle exec rails runner bin/schedule.rb
12
+#twitter: bundle exec rails runner bin/twitter_stream.rb
13
+#dj: bundle exec script/delayed_job run

+ 0 - 82
bin/schedule.rb

@@ -11,87 +11,5 @@ unless defined?(Rails)
11 11
   exit 1
12 12
 end
13 13
 
14
-require 'rufus/scheduler'
15
-
16
-class HuginnScheduler
17
-  attr_accessor :mutex
18
-
19
-  def run_schedule(time)
20
-    with_mutex do
21
-      puts "Queuing schedule for #{time}"
22
-      Agent.delay.run_schedule(time)
23
-    end
24
-  end
25
-
26
-  def propagate!
27
-    with_mutex do
28
-      puts "Queuing event propagation"
29
-      Agent.delay.receive!
30
-    end
31
-  end
32
-
33
-  def cleanup_expired_events!
34
-    with_mutex do
35
-      puts "Running event cleanup"
36
-      Event.delay.cleanup_expired!
37
-    end
38
-  end
39
-
40
-  def with_mutex
41
-    ActiveRecord::Base.connection_pool.with_connection do
42
-      mutex.synchronize do
43
-        yield
44
-      end
45
-    end
46
-  end
47
-
48
-  def run!
49
-    self.mutex = Mutex.new
50
-
51
-    rufus_scheduler = Rufus::Scheduler.new
52
-
53
-    tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]
54
-
55
-    # Schedule event propagation.
56
-
57
-    rufus_scheduler.every '1m' do
58
-      propagate!
59
-    end
60
-
61
-    # Schedule event cleanup.
62
-
63
-    rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
64
-      cleanup_expired_events!
65
-    end
66
-
67
-    # Schedule repeating events.
68
-
69
-    %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
70
-      rufus_scheduler.every schedule do
71
-        run_schedule "every_#{schedule}"
72
-      end
73
-    end
74
-
75
-    # Schedule events for specific times.
76
-
77
-    # Times are assumed to be in PST for now.  Can store a user#timezone later.
78
-    24.times do |hour|
79
-      rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
80
-        if hour == 0
81
-          run_schedule "midnight"
82
-        elsif hour < 12
83
-          run_schedule "#{hour}am"
84
-        elsif hour == 12
85
-          run_schedule "noon"
86
-        else
87
-          run_schedule "#{hour - 12}pm"
88
-        end
89
-      end
90
-    end
91
-
92
-    rufus_scheduler.join
93
-  end
94
-end
95
-
96 14
 scheduler = HuginnScheduler.new
97 15
 scheduler.run!

+ 57 - 0
bin/threaded.rb

@@ -0,0 +1,57 @@
1
+require 'thread'
2
+
3
+def stop
4
+  puts 'Exiting...'
5
+  @scheduler.stop
6
+  @dj.stop
7
+  @stream.stop
8
+end
9
+
10
+def safely(&block)
11
+  begin
12
+    yield block
13
+  rescue StandardError => e
14
+    STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
15
+    STDERR.puts "Terminating myself ..."
16
+    stop
17
+  end
18
+end
19
+
20
+threads = []
21
+threads << Thread.new do
22
+  safely do
23
+    @stream = TwitterStream.new
24
+    @stream.run
25
+    puts "Twitter stream stopped ..."
26
+  end
27
+end
28
+
29
+threads << Thread.new do
30
+  safely do
31
+    @scheduler = HuginnScheduler.new
32
+    @scheduler.run!
33
+    puts "Scheduler stopped ..."
34
+  end
35
+end
36
+
37
+threads << Thread.new do
38
+  safely do
39
+    require 'delayed/command'
40
+    @dj = Delayed::Worker.new
41
+    @dj.start
42
+    puts "Delayed job stopped ..."
43
+  end
44
+end
45
+
46
+# We need to wait a bit to let delayed_job set it's traps so we can override them
47
+sleep 0.5
48
+
49
+trap('TERM') do
50
+  stop
51
+end
52
+
53
+trap('INT') do
54
+  stop
55
+end
56
+
57
+threads.collect { |t| t.join }

+ 1 - 112
bin/twitter_stream.rb

@@ -12,115 +12,4 @@ unless defined?(Rails)
12 12
   exit 1
13 13
 end
14 14
 
15
-require 'cgi'
16
-require 'json'
17
-require 'twitter/json_stream'
18
-require 'em-http-request'
19
-require 'pp'
20
-
21
-def stream!(filters, agent, &block)
22
-  stream = Twitter::JSONStream.connect(
23
-    :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
24
-    :ssl     => true,
25
-    :oauth   => {
26
-      :consumer_key    => agent.twitter_consumer_key,
27
-      :consumer_secret => agent.twitter_consumer_secret,
28
-      :access_key      => agent.twitter_oauth_token,
29
-      :access_secret   => agent.twitter_oauth_token_secret
30
-    }
31
-  )
32
-
33
-  stream.each_item do |status|
34
-    status = JSON.parse(status) if status.is_a?(String)
35
-    next unless status
36
-    next if status.has_key?('delete')
37
-    next unless status['text']
38
-    status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
39
-    block.call(status)
40
-  end
41
-
42
-  stream.on_error do |message|
43
-    STDERR.puts " --> Twitter error: #{message} <--"
44
-  end
45
-
46
-  stream.on_no_data do |message|
47
-    STDERR.puts " --> Got no data for awhile; trying to reconnect."
48
-    EventMachine::stop_event_loop
49
-  end
50
-
51
-  stream.on_max_reconnects do |timeout, retries|
52
-    STDERR.puts " --> Oops, tried too many times! <--"
53
-    EventMachine::stop_event_loop
54
-  end
55
-end
56
-
57
-def load_and_run(agents)
58
-  agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
59
-    filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
60
-
61
-    agents.each do |agent|
62
-      agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
63
-        filter_to_agent_map[filter] << agent
64
-      end
65
-    end
66
-
67
-    recent_tweets = []
68
-
69
-    stream!(filter_to_agent_map.keys, agents.first) do |status|
70
-      if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
71
-        puts "Skipping retweet: #{status["text"]}"
72
-      elsif recent_tweets.include?(status["id_str"])
73
-        puts "Skipping duplicate tweet: #{status["text"]}"
74
-      else
75
-        recent_tweets << status["id_str"]
76
-        recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
77
-        puts status["text"]
78
-        filter_to_agent_map.keys.each do |filter|
79
-          if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
80
-            filter_to_agent_map[filter].each do |agent|
81
-              puts " -> #{agent.name}"
82
-              agent.process_tweet(filter, status)
83
-            end
84
-          end
85
-        end
86
-      end
87
-    end
88
-  end
89
-end
90
-
91
-RELOAD_TIMEOUT = 10.minutes
92
-DUPLICATE_DETECTION_LENGTH = 1000
93
-SEPARATOR = /[^\w_\-]+/
94
-
95
-while true
96
-  begin
97
-    agents = Agents::TwitterStreamAgent.all
98
-
99
-    EventMachine::run do
100
-      EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
101
-        puts "Reloading EventMachine and all Agents..."
102
-        EventMachine::stop_event_loop
103
-      }
104
-
105
-      if agents.length == 0
106
-        puts "No agents found.  Will look again in a minute."
107
-        sleep 60
108
-        EventMachine::stop_event_loop
109
-      else
110
-        puts "Found #{agents.length} agent(s).  Loading them now..."
111
-        load_and_run agents
112
-      end
113
-    end
114
-
115
-    print "Pausing..."; STDOUT.flush
116
-    sleep 1
117
-    puts "done."
118
-  rescue SignalException, SystemExit
119
-    EventMachine::stop_event_loop if EventMachine.reactor_running?
120
-    exit
121
-  rescue StandardError => e
122
-    STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
123
-    STDERR.puts "Waiting for a couple of minutes..."
124
-    sleep 120
125
-  end
126
-end
15
+TwitterStream.new.run

+ 1 - 0
config/initializers/delayed_job.rb

@@ -1,6 +1,7 @@
1 1
 Delayed::Worker.destroy_failed_jobs = true
2 2
 Delayed::Worker.max_attempts = 5
3 3
 Delayed::Worker.max_run_time = 20.minutes
4
+Delayed::Worker.read_ahead = 5
4 5
 Delayed::Worker.default_priority = 10
5 6
 Delayed::Worker.delay_jobs = !Rails.env.test?
6 7
 

+ 1 - 3
deployment/site-cookbooks/huginn_production/files/default/Procfile

@@ -1,4 +1,2 @@
1 1
 web: sudo bundle exec unicorn_rails -c config/unicorn.rb -E production
2
-schedule: sudo RAILS_ENV=production bundle exec rails runner bin/schedule.rb
3
-twitter: sudo RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb
4
-dj: sudo RAILS_ENV=production bundle exec script/delayed_job run
2
+jobs: sudo RAILS_ENV=production bundle exec rails runner bin/threaded.rb

+ 87 - 0
lib/huginn_scheduler.rb

@@ -0,0 +1,87 @@
1
+require 'rufus/scheduler'
2
+
3
+class HuginnScheduler
4
+  attr_accessor :mutex
5
+
6
+  def initialize
7
+    @rufus_scheduler = Rufus::Scheduler.new
8
+  end
9
+
10
+  def stop
11
+    @rufus_scheduler.stop
12
+  end
13
+
14
+  def run_schedule(time)
15
+    with_mutex do
16
+      puts "Queuing schedule for #{time}"
17
+      Agent.delay.run_schedule(time)
18
+    end
19
+  end
20
+
21
+  def propagate!
22
+    with_mutex do
23
+      puts "Queuing event propagation"
24
+      Agent.delay.receive!
25
+    end
26
+  end
27
+
28
+  def cleanup_expired_events!
29
+    with_mutex do
30
+      puts "Running event cleanup"
31
+      Event.delay.cleanup_expired!
32
+    end
33
+  end
34
+
35
+  def with_mutex
36
+    ActiveRecord::Base.connection_pool.with_connection do
37
+      mutex.synchronize do
38
+        yield
39
+      end
40
+    end
41
+  end
42
+
43
+  def run!
44
+    self.mutex = Mutex.new
45
+
46
+    tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]
47
+
48
+    # Schedule event propagation.
49
+
50
+    @rufus_scheduler.every '1m' do
51
+      propagate!
52
+    end
53
+
54
+    # Schedule event cleanup.
55
+
56
+    @rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
57
+      cleanup_expired_events!
58
+    end
59
+
60
+    # Schedule repeating events.
61
+
62
+    %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
63
+      @rufus_scheduler.every schedule do
64
+        run_schedule "every_#{schedule}"
65
+      end
66
+    end
67
+
68
+    # Schedule events for specific times.
69
+
70
+    # Times are assumed to be in PST for now.  Can store a user#timezone later.
71
+    24.times do |hour|
72
+      @rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
73
+        if hour == 0
74
+          run_schedule "midnight"
75
+        elsif hour < 12
76
+          run_schedule "#{hour}am"
77
+        elsif hour == 12
78
+          run_schedule "noon"
79
+        else
80
+          run_schedule "#{hour - 12}pm"
81
+        end
82
+      end
83
+    end
84
+
85
+    @rufus_scheduler.join
86
+  end
87
+end

+ 125 - 0
lib/twitter_stream.rb

@@ -0,0 +1,125 @@
1
+require 'cgi'
2
+require 'json'
3
+require 'twitter/json_stream'
4
+require 'em-http-request'
5
+require 'pp'
6
+
7
+class TwitterStream
8
+  def initialize
9
+    @running = true
10
+  end
11
+
12
+  def stop
13
+    @running = false
14
+    EventMachine::stop_event_loop if EventMachine.reactor_running?
15
+  end
16
+
17
+  def stream!(filters, agent, &block)
18
+    stream = Twitter::JSONStream.connect(
19
+      :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
20
+      :ssl     => true,
21
+      :oauth   => {
22
+        :consumer_key    => agent.twitter_consumer_key,
23
+        :consumer_secret => agent.twitter_consumer_secret,
24
+        :access_key      => agent.twitter_oauth_token,
25
+        :access_secret   => agent.twitter_oauth_token_secret
26
+      }
27
+    )
28
+
29
+    stream.each_item do |status|
30
+      status = JSON.parse(status) if status.is_a?(String)
31
+      next unless status
32
+      next if status.has_key?('delete')
33
+      next unless status['text']
34
+      status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
35
+      block.call(status)
36
+    end
37
+
38
+    stream.on_error do |message|
39
+      STDERR.puts " --> Twitter error: #{message} <--"
40
+    end
41
+
42
+    stream.on_no_data do |message|
43
+      STDERR.puts " --> Got no data for awhile; trying to reconnect."
44
+      EventMachine::stop_event_loop
45
+    end
46
+
47
+    stream.on_max_reconnects do |timeout, retries|
48
+      STDERR.puts " --> Oops, tried too many times! <--"
49
+      EventMachine::stop_event_loop
50
+    end
51
+  end
52
+
53
+  def load_and_run(agents)
54
+    agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
55
+      filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
56
+
57
+      agents.each do |agent|
58
+        agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
59
+          filter_to_agent_map[filter] << agent
60
+        end
61
+      end
62
+
63
+      recent_tweets = []
64
+
65
+      stream!(filter_to_agent_map.keys, agents.first) do |status|
66
+        if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
67
+          puts "Skipping retweet: #{status["text"]}"
68
+        elsif recent_tweets.include?(status["id_str"])
69
+          puts "Skipping duplicate tweet: #{status["text"]}"
70
+        else
71
+          recent_tweets << status["id_str"]
72
+          recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
73
+          puts status["text"]
74
+          filter_to_agent_map.keys.each do |filter|
75
+            if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
76
+              filter_to_agent_map[filter].each do |agent|
77
+                puts " -> #{agent.name}"
78
+                agent.process_tweet(filter, status)
79
+              end
80
+            end
81
+          end
82
+        end
83
+      end
84
+    end
85
+  end
86
+
87
+  RELOAD_TIMEOUT = 10.minutes
88
+  DUPLICATE_DETECTION_LENGTH = 1000
89
+  SEPARATOR = /[^\w_\-]+/
90
+
91
+  def run
92
+    while @running
93
+      begin
94
+        agents = Agents::TwitterStreamAgent.all
95
+
96
+        EventMachine::run do
97
+          EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
98
+            puts "Reloading EventMachine and all Agents..."
99
+            EventMachine::stop_event_loop
100
+          }
101
+
102
+          if agents.length == 0
103
+            puts "No agents found.  Will look again in a minute."
104
+            sleep 60
105
+            EventMachine::stop_event_loop
106
+          else
107
+            puts "Found #{agents.length} agent(s).  Loading them now..."
108
+            load_and_run agents
109
+          end
110
+        end
111
+
112
+        print "Pausing..."; STDOUT.flush
113
+        sleep 1
114
+        puts "done."
115
+      rescue SignalException, SystemExit
116
+        @running = false
117
+        EventMachine::stop_event_loop if EventMachine.reactor_running?
118
+      rescue StandardError => e
119
+        STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
120
+        STDERR.puts "Waiting for a couple of minutes..."
121
+        sleep 120
122
+      end
123
+    end
124
+  end
125
+end